/*
 * Copyright (c) Facebook, Inc. and its affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include "velox/common/base/BitUtil.h"
#include "velox/common/base/Exceptions.h"
#include "velox/vector/TypeAliases.h"

#include <folly/Range.h>
#include <xsimd/config/xsimd_config.hpp> // @manual

namespace facebook::velox::dwio::common {

using RowSet = folly::Range<const facebook::velox::vector_size_t*>;

static constexpr const uint64_t kPdepMask8[] = {
    0x0000000000000000,
    0x0101010101010101,
    0x0303030303030303,
    0x0707070707070707,
    0x0f0f0f0f0f0f0f0f,
    0x1f1f1f1f1f1f1f1f,
    0x3f3f3f3f3f3f3f3f,
    0x7f7f7f7f7f7f7f7f,
    0xffffffffffffffff};

// Pdep instruction masks for uint16_t. Generated by generatePdepMasks(16).
static constexpr const uint64_t kPdepMask16[] = {
    0x0000000000000000,
    0x0001000100010001,
    0x0003000300030003,
    0x0007000700070007,
    0x000f000f000f000f,
    0x001f001f001f001f,
    0x003f003f003f003f,
    0x007f007f007f007f,
    0x00ff00ff00ff00ff,
    0x01ff01ff01ff01ff,
    0x03ff03ff03ff03ff,
    0x07ff07ff07ff07ff,
    0x0fff0fff0fff0fff,
    0x1fff1fff1fff1fff,
    0x3fff3fff3fff3fff,
    0x7fff7fff7fff7fff,
    0xffffffffffffffff};

// Pdep instruction masks for uint32_t. Generated by generatePdepMasks(32).
static constexpr const uint64_t kPdepMask32[] = {
    0x0000000000000000, 0x0000000100000001, 0x0000000300000003,
    0x0000000700000007, 0x0000000f0000000f, 0x0000001f0000001f,
    0x0000003f0000003f, 0x0000007f0000007f, 0x000000ff000000ff,
    0x000001ff000001ff, 0x000003ff000003ff, 0x000007ff000007ff,
    0x00000fff00000fff, 0x00001fff00001fff, 0x00003fff00003fff,
    0x00007fff00007fff, 0x0000ffff0000ffff, 0x0001ffff0001ffff,
    0x0003ffff0003ffff, 0x0007ffff0007ffff, 0x000fffff000fffff,
    0x001fffff001fffff, 0x003fffff003fffff, 0x007fffff007fffff,
    0x00ffffff00ffffff, 0x01ffffff01ffffff, 0x03ffffff03ffffff,
    0x07ffffff07ffffff, 0x0fffffff0fffffff, 0x1fffffff1fffffff,
    0x3fffffff3fffffff, 0x7fffffff7fffffff, 0xffffffffffffffff};

static const uint32_t BITPACK_MASKS[] = {
    0,          1,          3,         7,         15,        31,
    63,         127,        255,       511,       1023,      2047,
    4095,       8191,       16383,     32767,     65535,     131071,
    262143,     524287,     1048575,   2097151,   4194303,   8388607,
    16777215,   33554431,   67108863,  134217727, 268435455, 536870911,
    1073741823, 2147483647, 4294967295};

/// Copies bit fields starting at 'bitOffset'th bit of 'bits' into
/// 'result'.  The indices of the fields are in 'rows' and their
/// bit-width is 'bitWidth'.  'rowBias' is subtracted from each
/// index in 'rows' before calculating the bit field's position. The
/// bit fields are considered little endian. 'bufferEnd' is the address of the
/// first undefined byte after the buffer containing the bits. If non-null,
/// extra-wide memory accesses will not be used at thee end of the range to
/// stay under 'bufferEnd'.
template <typename T>
void unpack(
    const uint64_t* FOLLY_NULLABLE bits,
    int32_t bitOffset,
    RowSet rows,
    int32_t rowBias,
    uint8_t bitWidth,
    const char* FOLLY_NULLABLE bufferEnd,
    T* FOLLY_NONNULL result);

/// Unpack numValues number of input values from inputBuffer. The results
/// will be written to result. numValues must be a multiple of 8. The
/// caller needs to make sure the inputBufferLen contains at least numValues
/// number of packed values. The inputBits and result pointers will be updated
/// to the next to read position after this call.
template <typename T>
static inline uint32_t unpackNaive(
    const uint8_t* FOLLY_NONNULL& inputBits,
    uint64_t inputBufferLen,
    uint64_t numValues,
    uint8_t bitWidth,
    T* FOLLY_NONNULL& result);

/// Unpack numValues number of input values from inputBuffer. The results
/// will be written to result. numValues must be a multiple of 8. The
/// caller needs to make sure the inputBufferLen contains at least numValues
/// number of packed values. The inputBits and result pointers will be updated
/// to the next to read position after this call.
template <typename T>
inline void unpack(
    const uint8_t* FOLLY_NONNULL& inputBits,
    uint64_t inputBufferLen,
    uint64_t numValues,
    uint8_t bitWidth,
    T* FOLLY_NONNULL& result) {
  unpackNaive<T>(inputBits, inputBufferLen, numValues, bitWidth, result);
}

template <>
inline void unpack<uint8_t>(
    const uint8_t* FOLLY_NONNULL& inputBits,
    uint64_t inputBufferLen,
    uint64_t numValues,
    uint8_t bitWidth,
    uint8_t* FOLLY_NONNULL& result);

template <>
inline void unpack<uint16_t>(
    const uint8_t* FOLLY_NONNULL& inputBits,
    uint64_t inputBufferLen,
    uint64_t numValues,
    uint8_t bitWidth,
    uint16_t* FOLLY_NONNULL& result);

template <>
inline void unpack<uint32_t>(
    const uint8_t* FOLLY_NONNULL& inputBits,
    uint64_t inputBufferLen,
    uint64_t numValues,
    uint8_t bitWidth,
    uint32_t* FOLLY_NONNULL& result);

// The function definitions are put here to make sure they are inlined. Moving
// them to the .cpp file may result in 10x regression.

template <typename T>
static inline uint32_t unpackNaive(
    const uint8_t* FOLLY_NONNULL& inputBits,
    uint64_t inputBufferLen,
    uint64_t numValues,
    uint8_t bitWidth,
    T* FOLLY_NONNULL& result) {
  VELOX_CHECK(bitWidth >= 1 && bitWidth <= sizeof(T) * 8);
  VELOX_CHECK(inputBufferLen * 8 >= bitWidth * numValues);

  auto mask = BITPACK_MASKS[bitWidth];

  uint64_t bitPosition = 0;
  for (uint32_t i = 0; i < numValues; i++) {
    T val = (*inputBits >> bitPosition) & mask;
    bitPosition += bitWidth;
    while (bitPosition > 8) {
      inputBits++;
      val |= (*inputBits << (8 - (bitPosition - bitWidth))) & mask;
      bitPosition -= 8;
    }
    result[i] = val;
  }
  return numValues;
}

#if XSIMD_WITH_AVX2

// numValues number of uint16_t values with bitWidth in
//  [1, 4] range.
static inline void unpack1to4(
    uint8_t bitWidth,
    const uint8_t* FOLLY_NONNULL& inputBuffer,
    uint64_t numValues,
    uint16_t* FOLLY_NONNULL& outputBuffer) {
  uint64_t pdepMask = kPdepMask8[bitWidth];

  uint64_t numBytesPerTime = (bitWidth * 16 + 7) / 8;
  uint64_t shift = bitWidth * 8;
  alignas(16) uint64_t intermediateValues[2];
  auto writeEndOffset = outputBuffer + numValues;

  // Process 2 * bitWidth bytes (16 values) a time.
  while (outputBuffer + 16 <= writeEndOffset) {
    uint64_t val = *reinterpret_cast<const uint64_t*>(inputBuffer);

    intermediateValues[0] = _pdep_u64(val, pdepMask);
    intermediateValues[1] = _pdep_u64(val >> shift, pdepMask);
    __m256i result = _mm256_cvtepu8_epi16(
        _mm_load_si128(reinterpret_cast<const __m128i*>(intermediateValues)));
    _mm256_storeu_si256(reinterpret_cast<__m256i*>(outputBuffer), result);

    inputBuffer += numBytesPerTime;
    outputBuffer += 16;
  }

  // Finish the last batch which has < 8 bytes. Now Process 8 values a time.
  uint64_t val = 0;
  while (outputBuffer + 8 <= writeEndOffset) {
    std::memcpy(&val, inputBuffer, bitWidth);

    uint64_t intermediateValue = _pdep_u64(val, pdepMask);
    __m256i result = _mm256_cvtepu8_epi16(_mm_loadl_epi64(
        (reinterpret_cast<const __m128i*>(&intermediateValue))));
    _mm256_storeu_si256(reinterpret_cast<__m256i*>(outputBuffer), result);

    inputBuffer += bitWidth;
    outputBuffer += 8;
  }

  numValues = writeEndOffset - outputBuffer;
  unpackNaive(
      inputBuffer,
      (bitWidth * numValues + 7) / 8,
      numValues,
      bitWidth,
      outputBuffer);
}

// Unpack numValues number of uint16_t values with bitWidth in [5, 8] range.
// Note that for bitWidth = 8, this solution is about the same as the simple
// 8->16 lane width conversion solution without pdep on Intel i9 compiled by
// clang.
static inline void unpack5to8(
    const uint8_t bitWidth,
    const uint8_t* FOLLY_NONNULL& inputBuffer,
    uint64_t numValues,
    uint16_t* FOLLY_NONNULL& outputBuffer) {
  uint64_t pdepMask = kPdepMask8[bitWidth];

  auto writeEndOffset = outputBuffer + numValues;
  alignas(16) uint64_t intermediateValues[2];

  // Process 2 * bitWidth bytes (16 values) a time.
  while (outputBuffer + 16 <= writeEndOffset) {
    uint64_t value1 = 0;
    std::memcpy(&value1, inputBuffer, bitWidth);
    intermediateValues[0] = _pdep_u64(value1, pdepMask);

    uint64_t value2 = 0;
    std::memcpy(&value2, inputBuffer + bitWidth, bitWidth);
    intermediateValues[1] = _pdep_u64(value2, pdepMask);

    __m256i result = _mm256_cvtepu8_epi16(
        _mm_load_si128(reinterpret_cast<const __m128i*>(intermediateValues)));
    _mm256_storeu_si256(reinterpret_cast<__m256i*>(outputBuffer), result);

    inputBuffer += bitWidth * 2;
    outputBuffer += 16;
  }

  // Finish the last batch which has < 16 bytes. Now Process bitWidth
  // bytes (8 values) a time.
  while (outputBuffer + 8 <= writeEndOffset) {
    uint64_t value = 0;
    std::memcpy(&value, inputBuffer, bitWidth);
    uint64_t intermediateValue = _pdep_u64(value, pdepMask);

    __m256i result = _mm256_cvtepu8_epi16(_mm_loadl_epi64(
        (reinterpret_cast<const __m128i*>(&intermediateValue))));
    _mm256_storeu_si256(reinterpret_cast<__m256i*>(outputBuffer), result);

    inputBuffer += bitWidth;
    outputBuffer += 8;
  }

  numValues = writeEndOffset - outputBuffer;
  unpackNaive(
      inputBuffer,
      (bitWidth * numValues + 7) / 8,
      numValues,
      bitWidth,
      outputBuffer);
}

// Unpack numValues number of uint16_t values with bitWidth = 8.
static inline void unpack8_cast(
    const uint8_t* FOLLY_NONNULL& inputBuffer,
    uint64_t numValues,
    uint16_t* FOLLY_NONNULL& outputBuffer) {
  auto writeEndOffset = outputBuffer + numValues;

  alignas(16) uint64_t vals[2];

  // Process bitWidth bytes (16 values) a time.
  while (outputBuffer + 16 <= writeEndOffset) {
    vals[0] = *reinterpret_cast<const uint64_t*>(inputBuffer);
    vals[1] = *reinterpret_cast<const uint64_t*>(inputBuffer + 8);

    __m256i result = _mm256_cvtepu8_epi16(*((const __m128i*)vals));
    _mm256_storeu_si256(reinterpret_cast<__m256i*>(outputBuffer), result);

    inputBuffer += 16;
    outputBuffer += 16;
  }

  // Finish the last batch which has < 16 bytes. Now process 8
  // bytes (8 values) a time.
  uint64_t val = 0;
  while (outputBuffer + 8 <= writeEndOffset) {
    std::memcpy(&val, inputBuffer, 8);

    __m256i result =
        _mm256_cvtepu8_epi16(_mm_loadl_epi64((const __m128i*)&val));
    _mm256_storeu_si256(reinterpret_cast<__m256i*>(outputBuffer), result);

    inputBuffer += 8;
    outputBuffer += 8;
  }

  numValues = writeEndOffset - outputBuffer;
  unpackNaive(inputBuffer, numValues, numValues, 8, outputBuffer);
}

// Unpack numValues number of uint16_t values with bitWidth in {9, 11, 13,
// 15}.
static inline void unpack9to15(
    const uint8_t bitWidth,
    const uint8_t* FOLLY_NONNULL& inputBuffer,
    uint64_t numValues,
    uint16_t* FOLLY_NONNULL& outputBuffer) {
  uint64_t pdepMask = kPdepMask16[bitWidth];

  uint8_t bytes2 = bitWidth / 2;
  uint8_t bytes1 = bitWidth - bytes2;

  uint8_t shift1 = bitWidth * 4;
  uint8_t shift2 = bytes1 * 8 - shift1;
  uint64_t valueMask = (1L << shift1) - 1;

  // Process bitWidth bytes (2 * 4 values) a time.
  auto writeEndOffset = outputBuffer + numValues;
  while (outputBuffer + 8 <= writeEndOffset) {
    // Process the first part of bytes1 bytes.
    uint64_t value1 = 0;
    std::memcpy(&value1, inputBuffer, bytes1);
    *reinterpret_cast<uint64_t*>(outputBuffer) =
        _pdep_u64(value1 & valueMask, pdepMask);

    // Process the second part of bytes2 bytes.
    uint64_t value2 = 0;
    std::memcpy(&value2, inputBuffer + bytes1, bytes2);
    *reinterpret_cast<uint64_t*>(outputBuffer + 4) =
        _pdep_u64((value1 >> shift1) | (value2 << shift2), pdepMask);

    inputBuffer += bitWidth;
    outputBuffer += 8;
  }

  numValues = writeEndOffset - outputBuffer;
  unpackNaive(
      inputBuffer,
      (bitWidth * numValues + 7) / 8,
      numValues,
      bitWidth,
      outputBuffer);
}

// Unpack numValues number of uint16_t values with bitWidth = 16
static inline void unpack16(
    const uint8_t* FOLLY_NONNULL& inputBuffer,
    uint64_t numValues,
    uint16_t* FOLLY_NONNULL& outputBuffer) {
  uint64_t numBytes = numValues * 2;
  std::memcpy(outputBuffer, inputBuffer, numBytes);

  inputBuffer += numBytes;
  outputBuffer += numBytes;
}

// Unpack numValues number of uint32_t values with bitWidth in [5, 8] range.
static inline void unpack1to7(
    uint8_t bitWidth,
    const uint8_t* FOLLY_NONNULL& inputBuffer,
    uint64_t numValues,
    uint32_t* FOLLY_NONNULL& outputBuffer) {
  uint64_t pdepMask = kPdepMask8[bitWidth];

  auto writeEndOffset = outputBuffer + numValues;

  // Process bitWidth bytes (8 values) a time.
  while (outputBuffer + 8 <= writeEndOffset) {
    uint64_t val = *reinterpret_cast<const uint64_t*>(inputBuffer);

    uint64_t intermediateVal = _pdep_u64(val, pdepMask);
    __m256i result = _mm256_cvtepu8_epi32(
        _mm_loadl_epi64(reinterpret_cast<const __m128i*>(&intermediateVal)));
    _mm256_storeu_si256(reinterpret_cast<__m256i*>(outputBuffer), result);

    inputBuffer += bitWidth;
    outputBuffer += 8;
  }

  numValues = writeEndOffset - outputBuffer;
  unpackNaive(
      inputBuffer,
      (bitWidth * numValues + 7) / 8,
      numValues,
      bitWidth,
      outputBuffer);
}

// Unpack numValues number of uint32_t values with bitWidth in [5, 8] range.
static inline void unpack1to7_shuffle(
    uint8_t bitWidth,
    const uint8_t* FOLLY_NONNULL& inputBuffer,
    uint64_t numValues,
    uint32_t* FOLLY_NONNULL& outputBuffer) {
  uint64_t pdepMask = kPdepMask8[bitWidth];

  auto writeEndOffset = outputBuffer + numValues;
  __m256i mask = _mm256_set_epi32(0, 1, 2, 3, 0, 1, 2, 3);

  // Process bitWidth bytes (8 values) a time.
  while (outputBuffer + 8 <= writeEndOffset) {
    uint64_t val = *reinterpret_cast<const uint64_t*>(inputBuffer);

    uint64_t intermediateVal = _pdep_u64(val, pdepMask);

    __m256i intermediateVec = _mm256_set_epi64x(intermediateVal, 0, 0, 0);
    __m256i result = _mm256_shuffle_epi8(intermediateVec, mask);
    _mm256_storeu_si256(reinterpret_cast<__m256i*>(outputBuffer), result);

    inputBuffer += bitWidth;
    outputBuffer += 8;
  }

  numValues = writeEndOffset - outputBuffer;
  unpackNaive(
      inputBuffer,
      (bitWidth * numValues + 7) / 8,
      numValues,
      bitWidth,
      outputBuffer);
}

// Unpack numValues number of uint32_t values with bitWidth = 8.
static inline void unpack8(
    const uint8_t* FOLLY_NONNULL& inputBuffer,
    uint64_t numValues,
    uint32_t* FOLLY_NONNULL& outputBuffer) {
  auto writeEndOffset = outputBuffer + numValues;

  // Process 8 bytes (8 values) a time.
  while (outputBuffer + 8 <= writeEndOffset) {
    uint64_t value = *(reinterpret_cast<const uint64_t*>(inputBuffer));
    __m128i packed = _mm_set_epi64x(0, value);
    __m256i result = _mm256_cvtepu8_epi32(packed);
    _mm256_storeu_si256(reinterpret_cast<__m256i*>(outputBuffer), result);

    inputBuffer += 8;
    outputBuffer += 8;
  }

  numValues = writeEndOffset - outputBuffer;
  unpackNaive(inputBuffer, numValues, numValues, 8, outputBuffer);
}

// Unpack numValues number of uint32_t values with bitWidth in [9, 15] range.
static inline void unpack9to15(
    uint8_t bitWidth,
    const uint8_t* FOLLY_NONNULL& inputBuffer,
    uint64_t numValues,
    uint32_t* FOLLY_NONNULL& outputBuffer) {
  uint64_t pdepMask = kPdepMask16[bitWidth];

  uint8_t shift1 = bitWidth * 4;
  uint8_t shift2 = 64 - shift1;

  alignas(16) uint64_t values[2] = {0, 0};
  alignas(16) uint64_t intermediateValues[2];

  auto writeEndOffset = outputBuffer + numValues;

  // Process bitWidth bytes (8 values) a time.
  while (outputBuffer + 8 <= writeEndOffset) {
    std::memcpy(values, inputBuffer, bitWidth);

    intermediateValues[0] = _pdep_u64(values[0], pdepMask);
    intermediateValues[1] =
        _pdep_u64((values[0] >> shift1) | (values[1] << shift2), pdepMask);

    __m256i result = _mm256_cvtepu16_epi32(
        _mm_load_si128(reinterpret_cast<const __m128i*>(intermediateValues)));
    _mm256_storeu_si256(reinterpret_cast<__m256i*>(outputBuffer), result);

    inputBuffer += bitWidth;
    outputBuffer += 8;
  }

  numValues = writeEndOffset - outputBuffer;
  unpackNaive(
      inputBuffer,
      (bitWidth * numValues + 7) / 8,
      numValues,
      bitWidth,
      outputBuffer);
}

// Unpack numValues number of uint32_t values with bitWidth = 16.
static inline void unpack16(
    const uint8_t* FOLLY_NONNULL& inputBuffer,
    uint64_t numValues,
    uint32_t* FOLLY_NONNULL& outputBuffer) {
  auto writeEndOffset = outputBuffer + numValues;

  // Process 16 bytes (8 values) a time.
  while (outputBuffer + 8 <= writeEndOffset) {
    __m256i result = _mm256_cvtepu16_epi32(
        _mm_loadu_si128(reinterpret_cast<const __m128i*>(inputBuffer)));
    _mm256_storeu_si256(reinterpret_cast<__m256i*>(outputBuffer), result);

    inputBuffer += 16;
    outputBuffer += 8;
  }

  numValues = writeEndOffset - outputBuffer;
  unpackNaive(inputBuffer, 2 * numValues, numValues, 16, outputBuffer);
}

// Unpack numValues number of uint32_t values with bitWidth in [17, 21] range.
static inline void unpack17to21(
    uint8_t bitWidth,
    const uint8_t* FOLLY_NONNULL& inputBuffer,
    uint64_t numValues,
    uint32_t* FOLLY_NONNULL& outputBuffer) {
  uint64_t pdepMask = kPdepMask32[bitWidth];

  uint8_t rightShift1 = bitWidth * 2;
  uint8_t leftShift1 = 64 - rightShift1;
  uint8_t rightShift2 = bitWidth * 4 - 64;
  uint8_t rightShift3 = bitWidth * 6 - 64;
  uint8_t leftShift3 = 128 - bitWidth * 6;

  alignas(16) uint64_t values[4] = {0, 0, 0, 0};

  auto writeEndOffset = outputBuffer + numValues;

  // Process bitWidth bytes (8 values) a time.
  while (outputBuffer + 8 <= writeEndOffset) {
    std::memcpy(values, inputBuffer, bitWidth);

    *reinterpret_cast<uint64_t*>(outputBuffer) = _pdep_u64(values[0], pdepMask);
    *(reinterpret_cast<uint64_t*>(outputBuffer) + 1) = _pdep_u64(
        (values[0] >> rightShift1) | (values[1] << leftShift1), pdepMask);
    *(reinterpret_cast<uint64_t*>(outputBuffer) + 2) =
        _pdep_u64(values[1] >> rightShift2, pdepMask);
    *(reinterpret_cast<uint64_t*>(outputBuffer) + 3) = _pdep_u64(
        (values[1] >> rightShift3) | (values[2] << leftShift3), pdepMask);

    inputBuffer += bitWidth;
    outputBuffer += 8;
  }

  numValues = writeEndOffset - outputBuffer;
  unpackNaive(
      inputBuffer,
      (bitWidth * numValues + 7) / 8,
      numValues,
      bitWidth,
      outputBuffer);
}

// Unpack numValues number of uint32_t values with bitWidth in [22, 31] range.
static inline void unpack22to31(
    uint8_t bitWidth,
    const uint8_t* FOLLY_NONNULL& inputBuffer,
    uint64_t numValues,
    uint32_t* FOLLY_NONNULL& outputBuffer) {
  uint64_t pdepMask = kPdepMask32[bitWidth];

  uint8_t rightShift1 = bitWidth * 2;
  uint8_t leftShift1 = 64 - rightShift1;
  uint8_t rightShift2 = bitWidth * 4 - 64;
  uint8_t leftShift2 = 2 * 64 - bitWidth * 4;
  uint8_t rightShift3 = bitWidth * 6 - 2 * 64;
  uint8_t leftShift3 = 3 * 64 - bitWidth * 6;

  alignas(16) uint64_t values[4] = {0, 0, 0, 0};

  auto writeEndOffset = outputBuffer + numValues;

  // Process bitWidth bytes (8 values) a time.
  while (outputBuffer + 8 <= writeEndOffset) {
    std::memcpy(values, inputBuffer, bitWidth);

    *reinterpret_cast<uint64_t*>(outputBuffer) = _pdep_u64(values[0], pdepMask);
    *(reinterpret_cast<uint64_t*>(outputBuffer) + 1) = _pdep_u64(
        (values[1] << leftShift1) | (values[0] >> rightShift1), pdepMask);
    *(reinterpret_cast<uint64_t*>(outputBuffer) + 2) = _pdep_u64(
        (values[2] << leftShift2) | (values[1] >> rightShift2), pdepMask);
    *(reinterpret_cast<uint64_t*>(outputBuffer) + 3) = _pdep_u64(
        (values[3] << leftShift3) | (values[2] >> rightShift3), pdepMask);

    inputBuffer += bitWidth;
    outputBuffer += 8;
  }

  numValues = writeEndOffset - outputBuffer;
  unpackNaive(
      inputBuffer,
      (bitWidth * numValues + 7) / 8,
      numValues,
      bitWidth,
      outputBuffer);
}

// Unpack numValues number of uint16_t values with bitWidth = 16
static inline void unpack32(
    const uint8_t* FOLLY_NONNULL& inputBuffer,
    uint64_t numValues,
    uint32_t* FOLLY_NONNULL& outputBuffer) {
  uint64_t numBytes = numValues * 4;
  std::memcpy(outputBuffer, inputBuffer, numBytes);

  inputBuffer += numBytes;
  outputBuffer += numValues;
}

#endif

template <>
inline void unpack<uint8_t>(
    const uint8_t* FOLLY_NONNULL& inputBits,
    uint64_t inputBufferLen,
    uint64_t numValues,
    uint8_t bitWidth,
    uint8_t* FOLLY_NONNULL& result) {
  VELOX_CHECK(bitWidth >= 1 && bitWidth <= 8);
  VELOX_CHECK(inputBufferLen * 8 >= bitWidth * numValues);

#if XSIMD_WITH_AVX2

  uint64_t mask = kPdepMask8[bitWidth];
  auto writeEndOffset = result + numValues;

  // Process bitWidth bytes (8 values) a time. Note that for bitWidth 8, the
  // performance of direct memcpy is about the same as this solution.
  while (result + 8 <= writeEndOffset) {
    // Using memcpy() here may result in non-optimized loops by clong.
    uint64_t val = *reinterpret_cast<const uint64_t*>(inputBits);
    *(reinterpret_cast<uint64_t*>(result)) = _pdep_u64(val, mask);
    inputBits += bitWidth;
    result += 8;
  }

  numValues = writeEndOffset - result;
  unpackNaive(
      inputBits, (bitWidth * numValues + 7) / 8, numValues, bitWidth, result);

#else

  unpackNaive<uint8_t>(inputBits, inputBufferLen, numValues, bitWidth, result);

#endif
}

template <>
inline void unpack<uint16_t>(
    const uint8_t* FOLLY_NONNULL& inputBits,
    uint64_t inputBufferLen,
    uint64_t numValues,
    uint8_t bitWidth,
    uint16_t* FOLLY_NONNULL& result) {
  VELOX_CHECK(bitWidth >= 1 && bitWidth <= 16);
  VELOX_CHECK(inputBufferLen * 8 >= bitWidth * numValues);

#if XSIMD_WITH_AVX2

  switch (bitWidth) {
    case 1:
    case 2:
    case 3:
    case 4:
      unpack1to4(bitWidth, inputBits, numValues, result);
      break;
    case 5:
    case 6:
    case 7:
      unpack5to8(bitWidth, inputBits, numValues, result);
      break;
    case 8:
      unpack8_cast(inputBits, numValues, result);
      break;
    case 9:
    case 11:
    case 13:
    case 15:
    case 10:
    case 12:
    case 14:
      unpack9to15(bitWidth, inputBits, numValues, result);
      break;
    case 16:
      unpack16(inputBits, numValues, result);
      break;
    default:
      VELOX_UNREACHABLE("invalid bitWidth");
  }
#else

  unpackNaive<uint16_t>(inputBits, inputBufferLen, numValues, bitWidth, result);

#endif
}

template <>
inline void unpack<uint32_t>(
    const uint8_t* FOLLY_NONNULL& inputBits,
    uint64_t inputBufferLen,
    uint64_t numValues,
    uint8_t bitWidth,
    uint32_t* FOLLY_NONNULL& result) {
  VELOX_CHECK(bitWidth >= 1 && bitWidth <= 32);
  VELOX_CHECK(inputBufferLen * 8 >= bitWidth * numValues);

#if XSIMD_WITH_AVX2

  switch (bitWidth) {
    case 1:
    case 2:
    case 3:
    case 4:
    case 5:
    case 6:
    case 7:
      unpack1to7(bitWidth, inputBits, numValues, result);
      break;
    case 8:
      unpack8(inputBits, numValues, result);
      break;
    case 9:
    case 10:
    case 11:
    case 12:
    case 13:
    case 14:
    case 15:
      unpack9to15(bitWidth, inputBits, numValues, result);
      break;
    case 16:
      unpack16(inputBits, numValues, result);
      break;
    case 17:
    case 18:
    case 19:
    case 20:
    case 21:
      unpack17to21(bitWidth, inputBits, numValues, result);
      break;
    case 22:
    case 23:
    case 24:
    case 25:
    case 26:
    case 27:
    case 28:
    case 29:
    case 30:
    case 31:
      unpack22to31(bitWidth, inputBits, numValues, result);
      break;
    case 32:
      unpack32(inputBits, numValues, result);
      break;
    default:
      VELOX_UNREACHABLE("invalid bitWidth");
  }

#else

  unpackNaive<uint32_t>(inputBits, inputBufferLen, numValues, bitWidth, result);

#endif
}

// Loads a bit field from 'ptr' + bitOffset for up to 'bitWidth' bits. makes
// sure not to access bytes past lastSafeWord + 7. The definition is put here
// because it's inlined.
inline uint64_t safeLoadBits(
    const char* FOLLY_NONNULL ptr,
    int32_t bitOffset,
    uint8_t bitWidth,
    const char* FOLLY_NONNULL lastSafeWord) {
  VELOX_DCHECK_GE(7, bitOffset);
  VELOX_DCHECK_GE(56, bitWidth);
  if (ptr < lastSafeWord) {
    return *reinterpret_cast<const uint64_t*>(ptr) >> bitOffset;
  }
  int32_t byteWidth =
      facebook::velox::bits::roundUp(bitOffset + bitWidth, 8) / 8;
  return facebook::velox::bits::loadPartialWord(
             reinterpret_cast<const uint8_t*>(ptr), byteWidth) >>
      bitOffset;
}

} // namespace facebook::velox::dwio::common
